import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector



//－－－－－－－－－－－－－输入的是带有时间戳的数据-－－－－－－－－－－－
object ProcessFunctionScalaV2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(2000)

    val stream: DataStream[String] = env.socketTextStream("Desktop", 9999)
    val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))
    val dataStream: DataStream[(String, Long)] = typeAndData
      .map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))
//    数据前面加个随机数，时间戳保持不变



    val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .aggregate(new CountAggregate())
    keyByAgg.print("第一次keyby输出")

    val result: DataStream[DataJast] = keyByAgg.map(data => {
      val newKey: String = data.key.substring(0, data.key.indexOf("-"))
      println(newKey)
      DataJast(newKey, data.count)
    }).keyBy(_.key)
      .process(new MyProcessFunction())
    result.print("第二次keyby输出")


    env.execute()
  }

  case class DataJast(key: String, count: Long)




}
 
 
//  代码来自:
// https://blog.csdn.net/zhangshenghang/article/details/105322423